Skip to content

feat(import): add async CSV import queue with job status endpoint#1708

Open
ClemRz wants to merge 6 commits into
developfrom
feat/async-csv-import-queue
Open

feat(import): add async CSV import queue with job status endpoint#1708
ClemRz wants to merge 6 commits into
developfrom
feat/async-csv-import-queue

Conversation

@ClemRz

@ClemRz ClemRz commented Jun 27, 2026

Copy link
Copy Markdown
Contributor

🤔 What

Converts the synchronous POST /api/v1/entrances/import-rows endpoint into an async queue-based pipeline and adds a new GET /api/v1/jobs/:batchId endpoint for progress polling.

  • Refactor import-rows to return 202 with a batch ID immediately
  • Add CSVImportQueueService with pg-boss backed chunk processing
  • Add TJobBatch model and t_job_batch SQL migration
  • Add job status endpoint with ownership/admin access control
  • Generate CSV reports (success/duplicates/failures) uploaded to Azure Blob
  • Send email notification on import completion (translated in 15 locales)
  • Add affinity-based chunking to keep duplicate-key rows in the same chunk

Closes #1574

🤷‍♂️ Why

The synchronous endpoint times out on large imports (thousands of rows). Moving to async processing:

  • Unblocks the HTTP request immediately (202 response)
  • Processes rows in parallel across 4 workers
  • Provides real-time progress feedback via polling
  • Generates downloadable CSV reports for successes/duplicates/failures
  • Notifies the user by email when done

🔍 How

Architecture:

  1. Controller validates the payload (mandatory columns on first row) and enqueues the batch
  2. CSVImportQueueService.createBatch() splits rows into chunks using affinity-based grouping (rows sharing the same dedup key always land in the same chunk)
  3. Each chunk is a pg-boss job processed by one of 4 concurrent workers (teamSize: 1, teamConcurrency: 4)
  4. After each chunk completes, a separate completion-check job fires to avoid a race condition (job state must be committed before querying)
  5. When all chunks are done, results are aggregated, CSV reports uploaded to Azure, and the user is notified

Key design decisions:

  • teamSize: 1 so pg-boss stores the handler return value as job output (with teamSize > 1, output is discarded by pg-boss v12)
  • Completion check runs as a separate queue to avoid reading stale job state
  • ENTRANCE_MANDATORY_COLUMNS extracted to csvHelper.js as single source of truth
  • sails.services.coordinatessnapshotservice used instead of require() to get the Sails-loaded instance

🧪 Testing

# Run unit + integration tests
npm run test -- --grep "CSVImportQueueService"
npm run test -- --grep "Job features"
npm run test -- --grep "Import rows"

# End-to-end smoke test (requires running dev server + Docker)
./scripts/test-async-import.sh

Tested locally with 2000 rows (1700 fresh + 300 duplicates) — completes in ~170s with correct classification (1700 successes, 300 duplicates, 0 failures).

📸 Previews

Polling response (in progress):

{
  "batchId": "6e5f37de-...",
  "status": "active",
  "progress": { "totalChunks": 41, "completedChunks": 20, "processedRows": 980 }
}

Completed response:

{
  "status": "completed",
  "progress": { "totalRows": 2000, "processedRows": 2000, "successes": 1700, "duplicates": 300, "failures": 0 },
  "result": { "reportUrls": { "success": "https://...", "duplicates": "https://..." }, "summary": { "successes": 1700, "duplicates": 300, "failures": 0 } }
}

@ClemRz ClemRz self-assigned this Jun 27, 2026
@ClemRz ClemRz requested a review from Paul-AUB June 27, 2026 22:31
@ClemRz

ClemRz commented Jun 27, 2026

Copy link
Copy Markdown
Contributor Author

Front-end Integration Guide

The CSV import endpoint has changed from synchronous (200 with results) to asynchronous (202 with batch ID + polling). Here's what needs to change in the front-end:

API Changes

Before (synchronous):

POST /api/v1/entrances/import-rows → 200 { successfulImport: [], failureImport: [], total: {...} }

After (asynchronous):

POST /api/v1/entrances/import-rows → 202 { batchId, totalRows, totalChunks, statusUrl }
GET /api/v1/jobs/:batchId           → 200 { batchId, type, status, progress, result }

Implementation Steps

  1. Submit the importPOST /api/v1/entrances/import-rows with { data: [...rows] }. Expect a 202 response:

    {
      "batchId": "uuid",
      "totalRows": 2000,
      "totalChunks": 41,
      "statusUrl": "/api/v1/jobs/uuid"
    }
  2. Poll for progressGET /api/v1/jobs/:batchId until status is "completed" or "failed":

    {
      "batchId": "uuid",
      "type": "csv-import",
      "status": "active",       // "pending" | "active" | "completed" | "failed"
      "progress": {
        "totalChunks": 41,
        "completedChunks": 20,
        "totalRows": 2000,
        "processedRows": 980,
        "successes": 900,
        "duplicates": 50,
        "failures": 30
      },
      "result": null             // populated only when status = "completed"
    }
  3. Show results — When status === "completed", result contains:

    {
      "reportUrls": {
        "success": "https://...signed-url...",    // null if no successes
        "duplicates": "https://...signed-url...", // null if no duplicates
        "failures": "https://...signed-url..."    // null if no failures
      },
      "summary": {
        "successes": 1700,
        "duplicates": 300,
        "failures": 0
      }
    }
  4. Handle errors:

    • 400 — empty data array or missing mandatory columns (immediate, no batch created)
    • 403 — user is not admin
    • 500 — job queue unavailable or batch creation failed
    • status: "failed" — all chunks failed during processing

UX Recommendations

  • Show a progress bar using processedRows / totalRows (or completedChunks / totalChunks for chunk granularity)
  • Poll every 2–3 seconds
  • When complete, show the summary and offer download links for the report CSVs (if URLs are non-null)
  • Report URLs are SAS-signed and expire after 7 days
  • The user also gets an email notification with the same summary and links

Access Control

  • Only the batch initiator can view their own job status
  • Moderators and Admins can view any batch
  • Non-authorized users get 403

New Notification Type: IMPORT_COMPLETE

This PR adds a new notification type IMPORT_COMPLETE (id: 8) to t_notification_type. The front-end notification system needs to handle this new type:

  • When: Sent to the user who initiated the CSV import batch, once all chunks have been processed
  • Display: Should render as an in-app notification (e.g., "Your CSV import has completed") with a link to view the results/download reports
  • Distinction from existing types: Unlike CREATE/UPDATE/DELETE which relate to entity CRUD operations, IMPORT_COMPLETE is a job-completion event — it won't have the usual entity link (cave, entrance, etc.) but instead relates to a batch job

@ClemRz

ClemRz commented Jun 27, 2026

Copy link
Copy Markdown
Contributor Author

Dev Environment: Local File Storage

This PR also includes a local file storage mechanism for development (commit feat(storage): add local file storage for dev environment).

What it does

When AZURE_KEY is not set (local dev), file uploads are stored on disk in .local-uploads/ instead of Azure Blob Storage — mirroring how the email helper logs to console instead of sending via SES.

How it works

  • Documents & reports are written to .local-uploads/documents/
  • DB exports are written to .local-uploads/db-exports/
  • Files are served back via a /local-uploads/* middleware on the local server
  • URLs in API responses point to http://localhost:<port>/local-uploads/...
  • .local-uploads/ is gitignored
  • nodemon.json whitelists only api/, config/, and app.js to prevent restarts when files are written

No changes needed to test locally

Just run npm run dev without AZURE_KEY — uploads work automatically. No more "Missing credential" warnings for document operations.

@Paul-AUB Paul-AUB left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a well-structured PR with a clear design rationale. The separation between chunk processing, completion checking, and aggregation is clean, and the affinity-based chunking to preserve duplicate detection is a solid approach. A few issues need addressing before merge.

Scope note: 2760 additions; analysis focused on CSVImportQueueService.js, the controller, model, migration, and config. The test files and Swagger doc were reviewed at a higher level.


Issues (Must Fix)

  1. [api/services/CSVImportQueueService.js:377–403] checkBatchCompletion is not idempotent, creating a duplicate-notification window. When the last two chunks complete at nearly the same time, both enqueue a completion-check job before either runs. The completion queue processes them sequentially, but both see allDone = true (all chunks are already committed to pgboss.job as completed). Both then call aggregateBatch, which uploads reports, creates a TNotification, and sends an email — resulting in the user receiving two emails and two in-app notifications. Fix: add an atomic test-and-set guard before calling aggregateBatch:

    const guard = await TJobBatch.updateOne({ id: batchId, status: 'active' }).set({ status: 'aggregating' });
    if (!guard) return; // another worker already took this
    await module.exports.aggregateBatch(batchId, jobs);

    This requires adding 'aggregating' to the isIn validator and the SQL CHECK constraint (or using a DB enum).

  2. [CODE_REVIEW.md] This self-review file should not be merged into the repository. It lists some items as open issues that the final implementation actually addresses (e.g., the race condition is mitigated via the completion queue), making the document misleading. The relevant discussion belongs in PR comments or the linked issue, not in version-controlled source. Remove this file before merging.


Suggestions (Should Consider)

  1. [api/services/CSVImportQueueService.js:411–452] aggregateBatch has no error handling around the Azure upload step. If generateAndUploadReports throws (e.g., transient Azure failure), the exception propagates up through checkBatchCompletionprocessCompletionCheck, the completion-check job fails, and TJobBatch.status stays stuck at 'active' permanently — no retry, no error state, no notification. Consider wrapping the upload in try/catch and setting status: 'failed' on error so the user is informed:

    try {
      const reportUrls = await module.exports.generateAndUploadReports(...);
      // ... rest of aggregation
    } catch (err) {
      sails.log.error('CSVImportQueueService: aggregation failed:', err);
      await TJobBatch.updateOne({ id: batchId }).set({ status: 'failed', completedAt: new Date() });
    }
  2. [api/models/TJobBatch.js:1–77] Add autoUpdatedAt: false to the model. The model explicitly customizes createdAt (good practice for a table that uses snake_case column names), but doesn't opt out of Waterline's automatic updatedAt management. The SQL migration has no updated_at column. While schema: true in the global config may prevent Waterline from inserting the column, this relies on implicit behaviour that could break silently if the global config changes. Being explicit is safer and consistent with the pattern already used for createdAt.

  3. [api/services/CSVImportQueueService.js:597–599] The email subject 'CSV Import Complete' is hardcoded English. The locale parameter is correctly passed to sendEmail, so the EJS template body is fully translated, but the subject line will be English for all 15 locales. Add a locale-aware i18n key for the subject (e.g., __('csv_import_complete_subject')).


Nitpicks (Optional)

  1. [api/services/CSVImportQueueService.js:346–353] The outer catch in processOneChunk swallows unexpected chunk-level errors and returns an empty result ({ successes: [], duplicates: [], failures: [] }). From the user's perspective, rows in that chunk silently disappear — the completed batch will show fewer total rows than totalRows with no failure entries to explain the discrepancy. Consider pushing all remaining unprocessed rows as failures when the outer catch fires, or re-throwing so pg-boss can retry the job (given retryLimit: 3 is already configured).

  2. [scripts/test-async-import.sh:38] TOKEN_SALT || 'aR4nd0mT0kenSalt' provides a hardcoded fallback secret. Acceptable for a local dev script, but the fallback risks accidentally generating valid tokens against a non-dev environment if TOKEN_SALT is unset. Consider removing the fallback and requiring the variable explicitly.

@ClemRz

ClemRz commented Jun 28, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review @Paul-AUB! Pushed fixes in 14fb03f. Here's what was addressed:

Fixed:

  1. Race condition in checkBatchCompletion — Added an atomic test-and-set guard (updateOne({id, status:'active'}).set({status:'aggregating'})). Only the first completion-check worker transitions the batch; subsequent ones get null and bail out. Added 'aggregating' to the model validator and Swagger enum.

  2. CODE_REVIEW.md removed — Agreed, doesn't belong in version control.

  3. Error handling in aggregateBatch — Wrapped generateAndUploadReports in try/catch. On failure, batch transitions to 'failed' with a completedAt timestamp so the user knows something went wrong.

  4. Silent row loss in processOneChunk — The outer catch now pushes all remaining unprocessed rows as failures with the error message, so no rows silently vanish from batch results.

  5. TOKEN_SALT fallback removed — Script now exits with an error if the env var is unset.

Not changed (with rationale):

  1. autoUpdatedAt: false — This setting is not supported at the model level in Sails 1.0 (causes a boot error). The global config already opts out of auto-timestamps by not defining updatedAt in the global attributes block, so there's no risk of Waterline injecting the column.

  2. Email subject i18n — The sendEmail helper already passes emailSubject through i18n.__() (see api/helpers/send-email.js:95), and translations for 'CSV Import Complete' exist in all 15 locale files. No change needed.

@ClemRz ClemRz force-pushed the feat/async-csv-import-queue branch from 14fb03f to 3c92d5e Compare June 28, 2026 20:27
@ClemRz ClemRz requested a review from Paul-AUB June 29, 2026 16:26
ClemRz added 6 commits June 29, 2026 10:27
- Refactor POST /api/v1/entrances/import-rows to return 202 with batch ID
- Add CSVImportQueueService with pg-boss backed chunk processing
- Add affinity-based chunking to keep duplicate-key rows together
- Add TJobBatch model and t_job_batch migration for batch tracking
- Add GET /api/v1/jobs/:batchId endpoint for progress polling
- Generate CSV reports (success/duplicates/failures) uploaded to Azure Blob
- Send email notification on import completion
- Add IMPORT_COMPLETE notification type and t_notification FK
- Update Swagger spec for new endpoints and schemas
- Add unit, property-based, and DB integration tests

# Conflicts:
#	assets/swaggerV1.yaml
- Store uploads in .local-uploads/ when AZURE_KEY is not set
- Serve local files via /local-uploads/* middleware
- Add .local-uploads/ to .gitignore
- Update FileService tests for new local storage behavior
pg-boss v12 only stores the handler's return value as job output when
teamSize is 1. With teamSize > 1, multiple jobs are passed to a single
handler invocation and the output is discarded.

Switch to teamSize:1, teamConcurrency:4 to keep parallelism while
ensuring each job's result (successes/duplicates/failures) is persisted.
This fixes the progress endpoint always showing 0 processed rows.
Generates 2000 rows (85% fresh, 15% duplicates), submits via the API,
polls for completion, downloads reports, and prints a summary.
Useful for end-to-end validation of the async import pipeline.
Add proper translations for the csv-import-complete email template keys
in all 15 supported languages (ar, bg, ca, de, el, en, es, fr, he, id,
it, ja, nl, pt, ro).
- Add atomic test-and-set guard in checkBatchCompletion to prevent
  duplicate notifications when concurrent completion-check jobs fire
- Add error handling around generateAndUploadReports in aggregateBatch
  so batch transitions to 'failed' instead of staying stuck at 'active'
- Push remaining unprocessed rows as failures when outer catch fires
  in processOneChunk to avoid silent row loss
- Remove TOKEN_SALT hardcoded fallback in test-async-import.sh
- Remove CODE_REVIEW.md (does not belong in version control)
- Add 'aggregating' status to TJobBatch model and Swagger spec
@ClemRz ClemRz force-pushed the feat/async-csv-import-queue branch from 3c92d5e to d1c2a3e Compare June 29, 2026 16:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(entrance): make CSV import asynchronous using pg-boss job queue

2 participants